| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 |
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
617x
617x
617x
617x
617x
617x
2x
617x
617x
617x
617x
617x
617x
2x
53x
2x
579x
543x
543x
617x
617x
74x
74x
74x
74x
74x
74x
2x
617x
617x
617x
617x
2x
2x
607x
607x
607x
9244x
9244x
9243x
9242x
9242x
1x
1x
1x
1x
1x
1x
9241x
9241x
9243x
9243x
617x
617x
617x
617x
617x
617x
617x
2x
9861x
1x
2x
2x
1305x
2x
671x
53x
53x
53x
53x
53x
53x
53x
50x
53x
2x
617x
617x
617x
2x
| /**
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { assert, fail } from './assert';
import * as log from './log';
import { AnyJs } from './misc';
import { Deferred, CancelablePromise } from './promise';
import { Code, FirestoreError } from './error';
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
type TimerHandle = any;
/**
* Wellknown "timer" IDs used when scheduling delayed operations on the
* AsyncQueue. These IDs can then be used from tests to check for the presence
* of operations or to run them early.
*/
export enum TimerId {
ListenStreamIdle,
ListenStreamConnection,
WriteStreamIdle,
WriteStreamConnection
}
/**
* Represents an operation scheduled to be run in the future on an AsyncQueue.
*
* It is created via DelayedOperation.createAndSchedule().
*
* Supports cancellation (via cancel()) and early execution (via skipDelay()).
*/
class DelayedOperation<T> implements CancelablePromise<T> {
// handle for use with clearTimeout(), or null if the operation has been
// executed or canceled already.
private timerHandle: TimerHandle | null;
private readonly deferred = new Deferred<T>();
private constructor(
private readonly asyncQueue: AsyncQueue,
readonly timerId: TimerId,
readonly targetTimeMs: number,
private readonly op: () => Promise<T>,
private readonly removalCallback: (op: DelayedOperation<T>) => void
) {}
/**
* Creates and returns a DelayedOperation that has been scheduled to be
* executed on the provided asyncQueue after the provided delayMs.
*
* @param asyncQueue The queue to schedule the operation on.
* @param id A Timer ID identifying the type of operation this is.
* @param delayMs The delay (ms) before the operation should be scheduled.
* @param op The operation to run.
* @param removalCallback A callback to be called synchronously once the
* operation is executed or canceled, notifying the AsyncQueue to remove it
* from its delayedOperations list.
* PORTING NOTE: This exists to prevent making removeDelayedOperation() and
* the DelayedOperation class public.
*/
static createAndSchedule<T>(
asyncQueue: AsyncQueue,
timerId: TimerId,
delayMs: number,
op: () => Promise<T>,
removalCallback: (op: DelayedOperation<T>) => void
): DelayedOperation<T> {
const targetTime = Date.now() + delayMs;
const delayedOp = new DelayedOperation(
asyncQueue,
timerId,
targetTime,
op,
removalCallback
);
delayedOp.start(delayMs);
return delayedOp;
}
/**
* Starts the timer. This is called immediately after construction by
* createAndSchedule().
*/
private start(delayMs: number): void {
this.timerHandle = setTimeout(() => this.handleDelayElapsed(), delayMs);
}
/**
* Queues the operation to run immediately (if it hasn't already been run or
* canceled).
*/
skipDelay(): void {
return this.handleDelayElapsed();
}
/**
* Cancels the operation if it hasn't already been executed or canceled. The
* promise will be rejected.
*
* As long as the operation has not yet been run, calling cancel() provides a
* guarantee that the operation will not be run.
*/
cancel(reason?: string): void {
if (this.timerHandle !== null) {
this.clearTimeout();
this.deferred.reject(
new FirestoreError(
Code.CANCELLED,
'Operation cancelled' + (reason ? ': ' + reason : '')
)
);
}
}
// Promise implementation.
readonly [Symbol.toStringTag]: 'Promise';
then = this.deferred.promise.then.bind(this.deferred.promise);
catch = this.deferred.promise.catch.bind(this.deferred.promise);
private handleDelayElapsed(): void {
this.asyncQueue.enqueue(() => {
Eif (this.timerHandle !== null) {
this.clearTimeout();
return this.op().then(result => {
return this.deferred.resolve(result);
});
} else {
return Promise.resolve();
}
});
}
private clearTimeout() {
Eif (this.timerHandle !== null) {
this.removalCallback(this);
clearTimeout(this.timerHandle);
this.timerHandle = null;
}
}
}
export class AsyncQueue {
// The last promise in the queue.
private tail: Promise<AnyJs | void> = Promise.resolve();
// Operations scheduled to be queued in the future. Operations are
// automatically removed after they are run or canceled.
private delayedOperations: Array<DelayedOperation<AnyJs>> = [];
// visible for testing
failure: Error;
// Flag set while there's an outstanding AsyncQueue operation, used for
// assertion sanity-checks.
private operationInProgress = false;
/**
* Adds a new operation to the queue. Returns a promise that will be resolved
* when the promise returned by the new operation is (with its value).
*/
enqueue<T>(op: () => Promise<T>): Promise<T> {
this.verifyNotFailed();
const newTail = this.tail.then(() => {
this.operationInProgress = true;
return op()
.catch(error => {
this.failure = error;
this.operationInProgress = false;
const message = error.stack || error.message || '';
log.error('INTERNAL UNHANDLED ERROR: ', message);
// Escape the promise chain and throw the error globally so that
// e.g. any global crash reporting library detects and reports it.
// (but not for simulated errors in our tests since this breaks mocha)
Iif (message.indexOf('Firestore Test Simulated Error') < 0) {
setTimeout(() => {
throw error;
}, 0);
}
// Re-throw the error so that this.tail becomes a rejected Promise and
// all further attempts to chain (via .then) will just short-circuit
// and return the rejected Promise.
throw error;
})
.then(result => {
this.operationInProgress = false;
return result;
});
});
this.tail = newTail;
return newTail;
}
/**
* Schedules an operation to be queued on the AsyncQueue once the specified
* `delayMs` has elapsed. The returned CancelablePromise can be used to cancel
* the operation prior to its running.
*/
enqueueAfterDelay<T>(
timerId: TimerId,
delayMs: number,
op: () => Promise<T>
): CancelablePromise<T> {
this.verifyNotFailed();
// While not necessarily harmful, we currently don't expect to have multiple
// ops with the same timer id in the queue, so defensively reject them.
assert(
!this.containsDelayedOperation(timerId),
`Attempted to schedule multiple operations with timer id ${
TimerId[timerId]
}.`
);
const delayedOp = DelayedOperation.createAndSchedule(
this,
timerId,
delayMs,
op,
op => this.removeDelayedOperation(op)
);
this.delayedOperations.push(delayedOp);
return delayedOp;
}
private verifyNotFailed(): void {
if (this.failure) {
fail(
'AsyncQueue is already failed: ' +
(this.failure.stack || this.failure.message)
);
}
}
/**
* Verifies there's an operation currently in-progress on the AsyncQueue.
* Unfortunately we can't verify that the running code is in the promise chain
* of that operation, so this isn't a foolproof check, but it should be enough
* to catch some bugs.
*/
verifyOperationInProgress(): void {
assert(
this.operationInProgress,
'verifyOpInProgress() called when no op in progress on this queue.'
);
}
/**
* Waits until all currently queued tasks are finished executing. Delayed
* operations are not run.
*/
drain(): Promise<void> {
return this.enqueue(() => Promise.resolve());
}
/**
* For Tests: Determine if a delayed operation with a particular TimerId
* exists.
*/
containsDelayedOperation(timerId: TimerId): boolean {
return this.delayedOperations.findIndex(op => op.timerId === timerId) >= 0;
}
/**
* For Tests: Runs some or all delayed operations early.
*
* @param lastTimerId If specified, only delayed operations up to and
* including this TimerId will be drained. Throws if no such operation
* exists.
* @returns a Promise that resolves once all operations have been run.
*/
runDelayedOperationsEarly(lastTimerId?: TimerId): Promise<void> {
// Note that draining may generate more delayed ops, so we do that first.
return this.drain().then(() => {
assert(
lastTimerId === undefined || this.containsDelayedOperation(lastTimerId),
`Attempted to drain to missing operation ${lastTimerId}`
);
// Run ops in the same order they'd run if they ran naturally.
this.delayedOperations.sort((a, b) => a.targetTimeMs - b.targetTimeMs);
for (const op of this.delayedOperations) {
op.skipDelay();
if (lastTimerId !== undefined && op.timerId === lastTimerId) {
break;
}
}
return this.drain();
});
}
/** Called once a DelayedOperation is run or canceled. */
private removeDelayedOperation<T>(op: DelayedOperation<T>) {
// NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small.
const index = this.delayedOperations.indexOf(op);
assert(index >= 0, 'Delayed operation not found.');
this.delayedOperations.splice(index, 1);
}
}
|